JavaよりEMRを起動してHiveスクリプトを実行する
はじめに
アプリケーションからEMRを起動し、Hiveスクリプトを実行して結果を出力した場合があるかと思います。今回はこのケースに対応するサンプルをJavaにて実装してみました。Spring Boot + AWS SDK for Javaを用いて実装しましたが、その他の言語でも考え方は応用できるかと思います。
実装について
処理の流れ
まずは今回のサンプルの処理の流れについてです。以下のようになります。
- EMRクラスターを起動する
- クラスターにステップを追加してHiveスクリプトを実行する
- 実行完了後にクラスターを終了する
抽出対象データ
今回のサンプルではCSVファイルをEMR(Hadoop)に取込み、任意の年月のデータを抽出しました。元データとなるCSVは以下の様になります。
2010,01,01,user_01,remarks 01 2010,01,02,user_02,remarks 02 2010,01,03,user_03,remarks 03 2010,01,04,user_01,remarks 04 2010,01,05,user_02,remarks 05 2010,01,06,user_03,remarks 06 2010,01,07,user_01,remarks 07 2010,01,08,user_02,remarks 08 2010,01,09,user_03,remarks 09 2010,01,10,user_01,remarks 10 2010,02,01,user_02,remarks 11 2010,02,02,user_03,remarks 12 2010,02,03,user_01,remarks 13 2010,02,04,user_02,remarks 14 2010,02,05,user_03,remarks 15 2010,02,06,user_01,remarks 16 2010,02,07,user_02,remarks 17 2010,02,08,user_03,remarks 18 2010,02,09,user_01,remarks 19 2010,02,10,user_02,remarks 20 2010,03,01,user_03,remarks 21 2010,03,02,user_01,remarks 22 2010,03,03,user_02,remarks 23 2010,03,04,user_03,remarks 24 2010,03,05,user_01,remarks 25 2010,03,06,user_02,remarks 26 2010,03,07,user_03,remarks 27 2010,03,08,user_01,remarks 28 2010,03,09,user_02,remarks 29 2010,03,10,user_03,remarks 30
このCSVファイルは、予めS3の任意のバケット内に保存しておく必要があります(後ほどHiveスクリプトで入力データのパスを指定する)。
build.gradle
では、Javaの実装についてです。Spring Boot + AWS SDK for Javaを使用するため、build.gradleは以下のようになりました。
build.gradle
buildscript { ext { springBootVersion = '1.4.3.RELEASE' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' jar { baseName = 'springBootEmrHiveSample' version = '0.0.1-SNAPSHOT' } sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile('org.springframework.boot:spring-boot-starter') compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.75' testCompile('org.springframework.boot:spring-boot-starter-test') }
Application.java
次にmain()を持つApplicationクラスです。今回のサンプルではrun()メソッド内で処理順序を制御します。AWS SDK for Javaを用いてのEMRの操作は後述するEMRクラス内で実装し、ここではEMRクラスのメソッドを呼び出すのみです。
Application.java
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication public class Application { public static void main(String[] args) { try (ConfigurableApplicationContext ctx = SpringApplication .run(Application.class, args)) { Application app = ctx.getBean(Application.class); app.run(args); } catch (Exception e) { e.printStackTrace(); } } public void run(String... args) throws Exception { System.out.println("処理開始"); Emr emr = new Emr(); //EMRクライアント初期化 emr.init(); //JobFlowの起動 String jobFlowId = emr.runJobFlow(); //クラスタの起動待機 while (true) { if (emr.getClusterStatus(jobFlowId).equals("WAITING")) break; System.out.println("Cluster creating."); Thread.sleep(10000); } //ステップの追加 String stepId = emr.addStep(jobFlowId); //ステップの処理完了待機 while (true) { if (emr.getStepStatus(jobFlowId, stepId).equals("COMPLETED")) break; System.out.println("Step running."); Thread.sleep(10000); } //JobFlowの終了 emr.terminateJobFlow(jobFlowId); System.out.println("処理終了"); } }
「JobFlowの起動」でEMRクラスターを立ち上げ、起動するまで待機します。次にステップを追加してHiveスクリプトを実行し、処理が完了するまで待機します。最後に「JobFlowの終了」でEMRクラスターを終了します。
Emr.java
EMRを実際に操作するクラスです。先ほどのApplicationクラス内のメソッドの呼び出し元と対比して見ると分かり易いかと思います。
Emr.java
package com.example; import java.util.ArrayList; import java.util.List; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient; import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure; import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest; import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult; import com.amazonaws.services.elasticmapreduce.model.Application; import com.amazonaws.services.elasticmapreduce.model.DescribeClusterRequest; import com.amazonaws.services.elasticmapreduce.model.DescribeClusterResult; import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest; import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult; import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig; import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig; import com.amazonaws.services.elasticmapreduce.model.PlacementType; import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest; import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult; import com.amazonaws.services.elasticmapreduce.model.StepConfig; import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest; public class Emr { private final String AWS_PROFILE = "your_profile"; //AWSクレデンシャルのプロファイル名 private final String EC2_KEY_NAME = "your_ec2_keypair_name"; //EC2のキーペア名 private final String LOG_URI = "s3://xxxxxx/logs"; //ログの出力先となるS3フォルダパス private final String QUERY_PATH = "s3://xxxxxx/query.sql"; //実行するHiveスクリプトのS3上のフルパス private final String INPUT_PATH = "s3://xxxxxx/input/"; //EMRに取り込む元データを格納したS3フォルダパス private final String OUTPUT_PATH = "s3://xxxxxx/output/"; //Hiveスクリプトの実行結果を出力するS3フォルダパス private final String CLUSTER_NAME = "clusterName"; private final String EMR_VERSION = "emr-5.2.1"; private final String SERVICE_ROLE = "EMR_DefaultRole"; private final String JOB_FLOW_NAME = "EMR_EC2_DefaultRole"; private final String MASTER_INSTANCE_TYPE = "m3.xlarge"; private final String SLAVE_INSTANCE_TYPE = "m3.xlarge"; private final String STEP_NAME = "Hive Program"; private AmazonElasticMapReduceClient emrClient; public void init() { AWSCredentialsProvider credentialsProvider = new ProfileCredentialsProvider( AWS_PROFILE); emrClient = new AmazonElasticMapReduceClient(credentialsProvider); emrClient.setRegion(Region.getRegion(Regions.AP_NORTHEAST_1)); } public String runJobFlow() { Application hive = new Application(); hive.withName("Hive"); RunJobFlowRequest request = new RunJobFlowRequest() .withName(CLUSTER_NAME) .withApplications(hive).withLogUri(LOG_URI) .withReleaseLabel(EMR_VERSION) .withServiceRole(SERVICE_ROLE) .withJobFlowRole(JOB_FLOW_NAME) .withVisibleToAllUsers(true) .withInstances(new JobFlowInstancesConfig().withInstanceCount(2) .withKeepJobFlowAliveWhenNoSteps(true) .withPlacement(new PlacementType() .withAvailabilityZone("ap-northeast-1a")) .withEc2KeyName(EC2_KEY_NAME) .withMasterInstanceType(MASTER_INSTANCE_TYPE) .withSlaveInstanceType(SLAVE_INSTANCE_TYPE)); RunJobFlowResult result = emrClient.runJobFlow(request); return result.getJobFlowId(); } public String getClusterStatus(String jobFlowId) { DescribeClusterResult result = emrClient.describeCluster( new DescribeClusterRequest().withClusterId(jobFlowId)); return result.getCluster().getStatus().getState(); } public String addStep(String jobFlowId) { AddJobFlowStepsResult result = emrClient .addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId(jobFlowId).withSteps(buildStepConfig())); return result.getStepIds().get(0); } public String getStepStatus(String jobFlowId, String stepId) { DescribeStepResult result = emrClient .describeStep(new DescribeStepRequest().withClusterId(jobFlowId) .withStepId(stepId)); return result.getStep().getStatus().getState(); } public void terminateJobFlow(String jobFlowId) { emrClient.terminateJobFlows( new TerminateJobFlowsRequest().withJobFlowIds(jobFlowId)); } private List<StepConfig> buildStepConfig() { List<StepConfig> result = new ArrayList<>(); String[] args = { "hive-script", "--run-hive-script", "--args", "-f", QUERY_PATH, "-d", "INPUT=" + INPUT_PATH, "-d", "OUTPUT=" + OUTPUT_PATH, "-d", "YYYY=2010", "-d", "MM=01" }; StepConfig step = new StepConfig().withName(STEP_NAME) .withActionOnFailure(ActionOnFailure.CONTINUE) .withHadoopJarStep(new HadoopJarStepConfig() .withJar("command-runner.jar").withArgs(args)); result.add(step); return result; } }
ソースの上部にある定数には適切な値を設定してください。100行目から始まるbuildStepConfig()でステップに渡す引数を設定しています。設定方法は「-d 引数名=値」となるよう指定します。
なお、このメソッド内で指定した値がマネージメントコンソール上のステップに以下のように表示されます。
なので開発時には、先にマネージメントコンソール上でステップを実行しつつHiveスクリプトや引数を決め、ここに表示されている値を元にプログラムから引数を指定することも可能です。
Hiveスクリプト
今回実行するHiveスクリプトとなります。
DROP TABLE IF EXISTS csv_input; CREATE EXTERNAL TABLE csv_input ( yyyy string, mm string, dd string, user_name string, remarks string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION '${INPUT}'; DROP TABLE IF EXISTS csv_output; CREATE EXTERNAL TABLE csv_output ( yyyy string, mm string, dd string, remarks string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '${OUTPUT}'; FROM csv_input input INSERT OVERWRITE TABLE csv_output SELECT input.yyyy, input.mm, input.dd, input.remarks WHERE input.yyyy = ${YYYY} AND input.mm = ${MM};
Hiveスクリプトでは引数は「${引数名}」の書式で指定します。ここの引数名の具体的な値として、先のJavaで指定した値が設定されます。
まとめ
以上でJavaからEMRクラスターを起動してHiveスクリプトを実行することができました。私個人としては、マネージメントコンソール上の表示とJavaのソースの関係が分かったことで、より理解が深まりました。プログラムからEMRクラスターを起動する際などの参考になれば幸いです。
参考サイト
以下のサイトを参考にさせて頂きました。ありがとうございました。